# pylint: disable=missing-function-docstring, missing-module-docstring, invalid-name, import-outside-toplevel
import codecs
import contextlib
import multiprocessing
import subprocess
import tarfile
import tempfile
import threading
import urllib
import zipfile
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


def test_codecs_open():
    fh = codecs.open("test.txt", "utf8")  # [consider-using-with]
    fh.close()


def test_urlopen():
    _ = urllib.request.urlopen("http://www.python.org")  # [consider-using-with]


def test_temporary_file():
    _ = tempfile.TemporaryFile("r")  # ambiguous with NamedTemporaryFile


def test_named_temporary_file():
    _ = tempfile.NamedTemporaryFile("r")  # [consider-using-with]


def test_spooled_temporary_file():
    _ = tempfile.SpooledTemporaryFile("r")  # [consider-using-with]


def test_temporary_directory():
    _ = tempfile.TemporaryDirectory()  # [consider-using-with]


def test_zipfile():
    myzip = zipfile.ZipFile("spam.zip", "w")  # [consider-using-with]
    _ = myzip.open("eggs.txt")  # [consider-using-with]


def test_pyzipfile():
    myzip = zipfile.PyZipFile("spam.zip", "w")  # [consider-using-with]

    with zipfile.PyZipFile("spam.zip", "w"):  # must not trigger
        pass

    _ = myzip.open("eggs.txt")  # [consider-using-with]

    with myzip.open("eggs.txt"):  # must not trigger
        pass


def test_tarfile():
    tf = tarfile.open("/tmp/test.tar", "w")  # [consider-using-with]
    tf.close()

    with tarfile.open("/tmp/test.tar", "w"):  # must not trigger
        pass

    tf = tarfile.TarFile("/tmp/test2.tar", "w")  # [consider-using-with]
    tf.close()

    with tarfile.TarFile("/tmp/test2.tar", "w"):  # must not trigger
        pass


def test_lock_acquisition():
    lock = threading.Lock()
    lock.acquire()  # [consider-using-with]
    lock.release()

    with lock:  # must not trigger
        pass

    rlock = threading.RLock()
    rlock.acquire()  # [consider-using-with]
    rlock.release()

    with rlock:  # must not trigger
        pass

    sema = threading.Semaphore()
    sema.acquire()  # [consider-using-with]
    sema.release()

    with sema:  # must not trigger
        pass

    bounded_sema = threading.BoundedSemaphore()
    bounded_sema.acquire()  # [consider-using-with]
    bounded_sema.release()

    with bounded_sema:  # must not trigger
        pass


@contextlib.contextmanager
def test_lock_acquisition_in_context_manager1():
    """
    The message must not be triggered if the resource allocation is done inside a context manager.
    """
    lock = threading.Lock()
    lock.acquire()  # must not trigger
    yield
    lock.release()


class MyLockContext:
    """
    The message must not be triggered if the resource allocation is done inside a context manager.
    """

    def __init__(self):
        self.lock = threading.Lock()

    def __enter__(self):
        self.lock.acquire()  # must not trigger

    def __exit__(self, exc_type, exc_value, traceback):
        self.lock.release()


def test_multiprocessing():
    # the different Locks provided by multiprocessing would be candidates
    # for consider-using-with as well, but they lead to InferenceErrors.
    _ = multiprocessing.Pool()  # [consider-using-with]
    with multiprocessing.Pool():
        pass

    manager = multiprocessing.managers.BaseManager()
    manager.start()  # [consider-using-with]
    with multiprocessing.managers.BaseManager():
        pass

    manager = multiprocessing.managers.SyncManager()
    manager.start()  # [consider-using-with]
    with multiprocessing.managers.SyncManager():
        pass


def test_popen():
    _ = subprocess.Popen("sh")  # [consider-using-with]
    with subprocess.Popen("sh"):
        pass


def test_suppress_in_exit_stack():
    """Regression test for issue #4654 (false positive)"""
    with contextlib.ExitStack() as stack:
        _ = stack.enter_context(
            open("/sys/firmware/devicetree/base/hwid,location", "r", encoding="utf-8")
        )  # must not trigger


def test_futures():
    """
    Regression test for issue #4689.
    ThreadPoolExecutor and ProcessPoolExecutor were formerly part of the callables that raised
    the R1732 message if used outside a with block, but there are legitimate use cases where
    Executor instances are used e.g. as a persistent background worker pool throughout the program.
    """
    thread_executor = ThreadPoolExecutor()
    thread_executor.submit(print, 1)
    process_executor = ProcessPoolExecutor()
    process_executor.submit(print, 2)
    thread_executor.shutdown()
    process_executor.shutdown()


pool = multiprocessing.Pool()  # must not trigger, as it is used later on
with pool:
    pass


global_pool = (
    multiprocessing.Pool()
)  # must not trigger, will be used in nested scope


def my_nested_function():
    with global_pool:
        pass


# this must also work for tuple unpacking
pool1, pool2 = (
    multiprocessing.Pool(),  # must not trigger
    multiprocessing.Pool(),  # must not trigger
)

with pool1:
    pass

with pool2:
    pass

unused_pool1, unused_pool2 = (
    multiprocessing.Pool(),  # [consider-using-with]
    multiprocessing.Pool(),  # [consider-using-with]
)

used_pool, unused_pool = (
    multiprocessing.Pool(),  # must not trigger
    multiprocessing.Pool(),  # [consider-using-with]
)
with used_pool:
    pass

unused_pool, used_pool = (
    multiprocessing.Pool(),  # [consider-using-with]
    multiprocessing.Pool(),  # must not trigger
)
with used_pool:
    pass


def test_subscript_assignment():
    """
    Regression test for issue https://github.com/PyCQA/pylint/issues/4732.
    If a context manager is assigned to a list or dict, we are not able to
    tell if / how the context manager is used later on, as it is not assigned
    to a variable or attribute directly.
    In this case we can only emit the message directly.
    """
    job_list = [None, None]
    job_list[0] = subprocess.Popen("ls")  # [consider-using-with]
    job_dict = {}
    job_dict["myjob"] = subprocess.Popen("ls")  # [consider-using-with]
